package com.dm.cloud.utils.distributedlock;

import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.logging.log4j.util.Strings;
import reactor.core.Disposable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 *
 * 基于ZK的分布式锁
 * 利用ZK的监听特性，锁释放试可以被监听，
 * 应用异常中断ZK自动删除NODE文件释放锁。
 * 但是性能较差、因为需要动态生成删除NODE文件
 * ZK节点之间的网络异常可能会导致并发现象（可能性很小）
 *
 * 1、获取指定节点的锁
 * 2、获取到锁则执行操作 操作完成后释放当前锁最小节点
 * 3、获取不到锁则对锁进行监听或者直接释放 （fifo公平锁）
 *
 */
@Slf4j
public class ZookeeperDSLock implements Disposable {
    private CountDownLatch cdl = new CountDownLatch(1);

    //IP PORT
    private String IP_PORT ;
    //父节点
    private String PARENT_NODE ;
    //连接超时
    private int connectionTimeout;
    //会话超时
    private int sessionTimeout;

    private ZkClient zkClient ;

    //记录紧前节点
    private volatile String beforePath;

    //记录当前节点
    private volatile String path;

    //记录节点目录
    private volatile List<String> children = new ArrayList<>();

    public static class Builder {
        //初始化节点和zk地址
        private String ipPort= "127.0.0.1:2181";
        private String pNode= "/LOCKNODE";
        private int connectionTimeout = 3000;
        private int sessionTimeout = 3000;
        public Builder() {
        }

        public void setIpPort(String ipPort){
            this.ipPort=ipPort;
        }

        public void setPNode(String pNode){
            this.pNode=pNode;
        }

        public void setConnectionTimeout(int connectionTimeout){
            this.connectionTimeout=connectionTimeout;
        }

        public void setSessionTimeout(int sessionTimeout){
            this.sessionTimeout=sessionTimeout;
        }

        public ZookeeperDSLock build(){
            return new ZookeeperDSLock(this);
        }
    }

    private ZookeeperDSLock(Builder builder) {
        //初始化信息
        this.IP_PORT=builder.ipPort;
        this.PARENT_NODE=builder.pNode;
        this.connectionTimeout = builder.connectionTimeout;
        this.sessionTimeout = builder.sessionTimeout;

        try {
            zkClient = new ZkClient(IP_PORT, sessionTimeout, connectionTimeout);
        }catch (Exception ex){
            throw new RuntimeException("ZkClient 初始化失败:"+ex.getMessage());
        }

        //父节点不存在则先创建
        if (!zkClient.exists(PARENT_NODE)) {
            try {
                zkClient.createPersistent(PARENT_NODE);
            }catch (Exception ex){
                throw new RuntimeException("ZkClient 父节点创建失败:");
            }
        }
    }

    //排队等到锁 执行任务
    public void doOnWaitLock(String lockObj,ZKDSWorker zkdsWorker) {

        if(getLock(lockObj)){
            // 获得锁
            zkdsWorker.todo();
            //释放锁
            this.unlock(lockObj);
        }else{
            //未得到锁 对节点进行监听
            waitForLock(lockObj,zkdsWorker);
        }
    }

    //获得锁
    public synchronized boolean lock(String lockObj) {
        // 当前的是最小节点就返回加锁成功
        if (getLock(lockObj)) {
            return true;
        } else {
            //删掉当前节点 没有获取锁的话 删除临时节点
            zkClient.deleteRecursive(path);
            return false;
        }
    }

    //释放锁
    public void unlock(String lockObj) {
        if (Strings.isBlank(path)) {
            path = zkClient.createEphemeralSequential(PARENT_NODE + "/", lockObj);
        }
        //遍历子节点删除所有子节点后再删除目标目录
        zkClient.deleteRecursive(path);
    }

    //资源释放
    @Override
    public void dispose() {
        zkClient.close();
    }

    //判断是不是可以获得到锁
    private boolean getLock(String lockObj){

        // 创建自己的临时节点
        if (Strings.isBlank(path)) {
            path = zkClient.createEphemeralSequential(PARENT_NODE + "/", lockObj);
        }

        //最小节点可以获得锁
        return isMinNode();
    }

    //判断当前任务创建的节点是不是最小节点
    private boolean isMinNode(){

        // 对节点排序
        children = zkClient.getChildren(PARENT_NODE);
        Collections.sort(children);

        return path.equals(PARENT_NODE + "/" + children.get(0));
    }

    //等待锁
    private void waitForLock(String lockObj,ZKDSWorker zkdsWorker) {

        // 不是最小节点 就找到自己的前一个 依次类推 释放也是一样
        int i = Collections.binarySearch(children, path.substring(PARENT_NODE.length() + 1));
        beforePath = PARENT_NODE + "/" + children.get(i - 1);

        IZkDataListener listener = new IZkDataListener() {
            public void handleDataChange(String s, Object o) throws Exception {
            }

            public void handleDataDeleted(String s) throws Exception {
                cdl.countDown();
            }
        };
        // 监听
        this.zkClient.subscribeDataChanges(beforePath, listener);
        if (zkClient.exists(beforePath)) {
            try {
                //等待加锁
                cdl.await();

                //判断是不是最小节点
                if(isMinNode()){
                    zkdsWorker.todo();
                    // 最后释放监听
                    zkClient.unsubscribeDataChanges(beforePath, listener);
                    this.unlock(lockObj);
                }else{
                    // 释放监听
                    zkClient.unsubscribeDataChanges(beforePath, listener);
                    //对节点重新进行监听
                    waitForLock(lockObj,zkdsWorker);
                }
            } catch (InterruptedException e) {
                log.error("加锁失败",e);
            }
        }
    }
}
